20240813 모각코 활동 7회차

오늘의목표
RESNET 실습 - CIFAR10 이미지 분류
양자화 공부

#필요한 모듈 불러오기

import torch
import torch.nn as nn #다양한 종류의 레이어 제공 -> 모델 만들기 도우미!
import torch.nn.functional as F #활성화 함수, 손실함수 등을 함수 형태로 제공.
import torch.backends.cudnn as cudnn

모델링

#BasicBlock 클래스 정의  
  
class BasicBlock(nn.Module): # nn.Module 상속받기  
    def __init__(self, in_planes, planes, stride = 1):  
        super(BasicBlock, self).__init__() #BasicBlock의 부모클래스인 nn.Module의 __init__함수를 먼저 호출해서 사용.  
                  
        #conv1과 conv2 설정  
        #2D 컨볼루션 레이어 설정  
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size = 3, stride = stride, padding = 1, bias = False) # in_planes 입력채널 수 / planes 출력채널 수 / kernel_size 3*3 필터(커널) 사용 / stride (커널로 훑을 때의 보폭) 기본값은 1 / padding 패딩의 크기 1 / bias = False 바이어스(출력값을 조절하기 위해 사용되는  값) 를 사용하지 않겠다. -> 바로 다음 줄의 코드(배치정규화)에서 바이어스의 역할을 해주기 때문에 여기에선 사용하지 않는다.  
        #배치 정규화 설정  
        self.bn1 = nn.BatchNorm2d(planes) # planes 배치정규화를 적용할 채널의 수. 앞의 출력 채널의 수와 동일해야함(당연함)  
  
        #2D 컨볼루션 레이어 설정  
        self.conv2 = nn.Conv2d(planes, planes, kernel_size = 3, stride = 1, padding = 1, bias = False)  
        #배치 정규화 설정  
        self.bn2 = nn.BatchNorm2d(planes)  
                    
# shortcut 설정 -> `H(x) = R(x) + x`에서의 x를 위한 작업  
        self.shortcut = nn.Sequential() # nn.Sequential : pytorch에서 여러 레이어들을 순서대로 쌓을 때 사용하는 도구 # x를 그대로 더할 수 있는 경우  
        if stride != 1: #stride의 값이 1인경우(입력과 출력의 채널 수가 다른 경우 = x를 그대로 더할 수 없는 경우)   
self.shortcut = nn.Sequential(  
                nn.Conv2d(in_planes, planes, kernel_size = 1, stride = stride, bias = False),  
                nn.BatchNorm2d(planes)  
            ) # nn.Sequential을 사용해서 Conv2d와 BatchNorm레이어들을 이어줬음  
                  
#순전파 함수 # __init__에서 설정해뒀던 거 실제로 사용하는 부분.  
    def forward(self,x):  
        out = F.relu(self.bn1(self.conv1(x))) #conv1 거치고, relu함수 거치기  
        out = self.bn2(self.conv2(out)) #그다음 conv2 거치기  
        out += self.shortcut(x) # resnet의 핵심인 skip connection : H(x) = R(x) + x
#ResNet 클래스 정의  
class ResNet(nn.Module):  
    def __init__(self, block, num_blocks, num_classes = 10):  
        super(ResNet, self).__init__() #ResNet의 부모클래스인 nn.Module의 __init__함수를 먼저 호출해서 사용.  
        self.in_planes = 64 # 입력 채널 수 64        # 2D 컨볼루션레이어 설정  
        self.conv1 = nn.Conv2d(3, 64, kernel_size = 3, stride = 1, padding = 1, bias = False) # 입력채널 수 3 / 출력채널 수 64 / kernel_size 3*3 필터(커널) 사용 / stride (커널로 훑을 때의 보폭) 1 / padding 패딩의 크기 1 / bias = False 바이어스(출력값을 조절하기 위해 사용되는  값) 를 사용하지 않겠다.  
        # 배치정규화 설정  
        self.bn1 = nn.BatchNorm2d(64) # 배치정규화를 위해 사용할 채널 수 = 이전 채널에서의 출력 채널 수 = 64        # 레이어블록 설정(각 블록은 앞서 정의한 BASIC BLOCK으로 구성될거임. 인자 block 자리에, BasicBlock이 들어갈거니까아아아~~)  
        # _make_layer() : (블록의 종류, 출력 채널 수, 쌓을 블럭의 수, 레이어의 첫 블럭에서 사용할 stride의 값)  
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride = 1) #  
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride = 2)  
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride = 2)  
        # self._make_layer()에서 self는 현재 클래스의 인스턴스를 가리킴.  
        # 클래스 예측값 계산  
        self.linear = nn.Linear(512, num_classes) # 입력 채널 수 512, 출력 채널 수 num_classes        # _make_layer 함수 설정  
    def _make_layer(self, block, planes, num_blocks, stride):  
        strides = [stride] + [1] * (num_blocks -1) # stride 값 설정 # 첫 번째 블록의 stride는 지정된 값을 사용하고 이후 블럭들은 stride = 1이 된다.  
        layers = [] # 블럭을 담을 빈 리스트 생성  
        for stride in strides:  
            layers.append(block(self.in_planes, planes, stride)) # 입력 채널 수 self.in_planes, 출력 채널 수 planes, 스트라이드 값 stride            self.in_planes = planes # 채널 수 변경해주기(다음 레이어를 위해)  
        return nn.Sequential(*layers) # 생성한 블록들을 하나의 레이어로 묶어서 반환.  
    # 순전파 함수 # __init__ 설정해뒀던거랑 _make_layer 함수 만든 거 실제로 사용하는 부분.  
    def forward(self, x):  
        out = F.relu(self.bn1(self.conv1(x)))  
        out = self.layer1(out)  
        out = self.layer2(out)  
        out = self.layer3(out)  
        out = self.layer4(out)  
        out = F.avg_pool2d(out, 4) # 풀링층  
        out = out.view(out.size(0),-1) # 텐서의 차원 변경  
        out = self.linear(out) #완전 연결층  
        return out
# ResNet 18 함수 정의  
def ResNet18():  
    return ResNet(BasicBlock, [2,2,2,2])

데이터 불러오기

import torchvision
import torchvision.transforms as transforms


transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
])


transform_test = transforms.Compose([
    transforms.ToTensor(),
])

  
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)


train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=2)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=100, shuffle=False, num_workers=2)

학습시키기

device = 'cuda'
net = ResNet18()
net = net.to(device)
learning_rate = 0.1
file_name = 'resnet18_cifar10.pth'
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=learning_rate, momentum=0.9, weight_decay=0.0002)


def train(epoch):
    print('\n[ Train epoch: %d ]' % epoch)
    net.train()
    train_loss = 0
    correct = 0
    total = 0

    for batch_idx, (inputs, targets) in enumerate(train_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()

        outputs = net(inputs)
        loss = criterion(outputs, targets)
        loss.backward()

        optimizer.step()
        train_loss += loss.item()
        _, predicted = outputs.max(1)

        total += targets.size(0)
         current_correct = predicted.eq(targets).sum().item()
        correct += current_correct
        if batch_idx % 100 == 0:

            print('\nCurrent batch:', str(batch_idx))
            print('Current batch average train accuracy:', current_correct / targets.size(0))
            print('Current batch average train loss:', loss.item() / targets.size(0))

    print('\nTotal average train accuarcy:', correct / total)
    print('Total average train loss:', train_loss / total)


def test(epoch):
    print('\n[ Test epoch: %d ]' % epoch)
    net.eval()
    loss = 0
    correct = 0
    total = 0

  
    for batch_idx, (inputs, targets) in enumerate(test_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        total += targets.size(0)


        outputs = net(inputs)
        loss += criterion(outputs, targets).item()


        _, predicted = outputs.max(1)
        correct += predicted.eq(targets).sum().item()


    print('\nTotal average test accuarcy:', correct / total)
    print('Total average test loss:', loss / total)


    state = {
        'net': net.state_dict()
    }
    if not os.path.isdir('checkpoint'):
        os.mkdir('checkpoint')
    torch.save(state, './checkpoint/' + file_name)
    print('Model Saved!')



import time

def adjust_learning_rate(optimizer, epoch):
    lr = learning_rate
    if epoch >= 50:
        lr /= 10
    if epoch >= 100:
        lr /= 10
        
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr
  
start_time = time.time()

for epoch in range(0, 150):
    adjust_learning_rate(optimizer, epoch)
    train(epoch)
    test(epoch)
    print('\nTime elapsed:', time.time() - start_time)

양자화 공부
양자화 : 실수형 변수(floating-point type)를 정수형 변수(integer or fixed point)로 변환하는 과정
양자화 하는 이유 : 인공지능 모델에 큰 비트수의 자료형을 사용 -> 학습 과정에서 계산량과 필요한 메모리 크기 등이 커지게 됨. -> 학습을 시키기 위해 많은 리소스가 필요해지고, 추론도 오래 걸리는 문제가 발생. 양자화를 통하여 효과적인 모델 최적화를 할 수 있는데, float 타입을 int형으로 줄이면서 용량을 줄일 수 있고 bit 수를 줄임으로써 계산 복잡도도 줄일 수 있음
Pipeline
-HuggingFace의 가장 기본 기능으로, 자연어 처리 작업, inference(추론)을 빠르게 할 수 있게 해준다.
-(hugging face에 대한 내용은 처음 보낸 코랩 파일 가장 위에 있으니 더 알아보고싶으시면 참고하시면 됩니다!)
-pretrained model(사전학습 모델)을 사용하는 가장 쉬운 방법.
-사전학습모델이란 : 예를 들어 텍스트 유사도 예측 모델을 만들기 위해서, 감정 분석 문제를 학습했던 모델의 가중치를 활용하는 방법. 즉, 감정 분석 문제를 학습하면서 얻은 언어에 대한 이해를 텍스트 유사도 문제를 학습하는 데 활용하는 방식이다.
pipeline(task, model, config, tokenizer, feature_extractor, framework, revision, use_fast, use_auth_token, model_kwargs, pipeline_class, kwargs) 매개변수 설명**
-task : 어떤 작업을 할것인가? -> 여기에서는 'text-generation' 텍스트 생성 작업을 할거임. ( 그 외 question-answering, translation 등등이 있음 ) 이건 pipeline을 사용할 때 꼭 지정해주어야 함. 나머지것들은 기본으로 지정된 것들이 있기 때문에 따로 필요한 경우만 지정해주면 됨.
-model : 어떤 모델을 사용할것인가? -> 여기에서는 "meta-llama/Meta-Llama-3-8B-Instruct" 라는 hugging face에서 미리 가져온 모델을 사용.
-device map : 모델이 어디서(GPU 또는 CPU) 실행되어야할까? -> 여기에서는 "auto" 로, 현재 기기에서 사용가능한 장소를 자동으로 감지하고, GPU가 있다면 이를 우선적으로 사용
-model_kwargs : 추가로 전달할 매개변수(예를 들어 특정 설정을 변경하는 경우 사용) -> 여기에서는 {"quantization_config": quantization_config} 이라는 quantization(양자화) 에 대한 설정을 포함하구 있음.

#준비
!pip install bitsandbytes # 양자화 기법을 사용할 수 있게 해주는 파이썬 모듈 다운로드
!pip install -U bitsandbytes
from transformers import pipeline, BitsAndBytesConfig # BitsAndBytesConfig 허깅페이스에서 양자화를 위한 라이브러리

  

#허깅페이스 로그인("meta-llama/Meta-Llama-3-8B-Instruct"를 사용하기 위함)
from huggingface_hub import login
login("내 TOKEN")

  

#양자화 옵션 설정
#4bit로 되어있긴 하지만, 8bit도 가능.

quantization_config = BitsAndBytesConfig(load_in_4bit=True)  # You can also try load_in_8bit
pipe = pipeline("text-generation", "meta-llama/Meta-Llama-3-8B-Instruct", device_map="auto", model_kwargs={"quantization_config": quantization_config})



#양자화 한 후 실행
chat = [
    {"role": "system", "content": "You are a sassy, wise-cracking robot as imagined by Hollywood circa 1986."},
    {"role": "user", "content": "Hey, can you tell me any fun things to do in New York?"}
]
response = pipe(chat, max_new_tokens=512)
print(response[0]['generated_text'][-1]['content'])
chat.append(
    {"role": "user", "content": "Wait, what's so wild about soup cans?"}
)
response = pipe(chat, max_new_tokens=512)
print(response[0]['generated_text'][-1]['content'])